package cn.zhaopin.starter.mq.config;

import cn.zhaopin.starter.mq.constant.MQConstant;
import cn.zhaopin.starter.mq.core.PulsarTemplate;
import cn.zhaopin.starter.mq.properties.MQProperties;
import cn.zhaopin.starter.mq.properties.PulsarProperties;
import cn.zhaopin.starter.mq.support.JSON2Schema;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;

/**
 * Description: Pulsar自动配置
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/16-17:06
 */
@Configuration
@ConditionalOnClass({PulsarClient.class})
@ConditionalOnProperty(prefix = MQConstant.PREFIX, value = "type", havingValue = MQConstant.MqType.PULSAR)
@Import({ListenerContainerConfiguration.class})
@EnableConfigurationProperties(value = {MQProperties.class})
public class PulsarAutoConfiguration implements ApplicationContextAware, InitializingBean {

    private static Logger log = LoggerFactory.getLogger(PulsarAutoConfiguration.class);

    public static final String PULSAR_CLIENT_BEAN_NAME = "defaultPulsarClient";
    public static final String PULSAR_TEMPLATE_BEAN_NAME = "pulsarTemplate";

    @Autowired
    private Environment environment;

    private ApplicationContext applicationContext;

    private MessageConverter messageConverter;

    @Bean(PULSAR_CLIENT_BEAN_NAME)
    @ConditionalOnMissingBean(PulsarClient.class)
    public PulsarClient client(MQProperties mqProperties) {
        PulsarProperties pulsarProperties = mqProperties.getPulsar();
        try {
            return PulsarClient.builder()
                    .serviceUrl(pulsarProperties.getServiceUrl())
                    .authentication(AuthenticationFactory.token(pulsarProperties.getAuthenticationToken()))
                    .build();
        } catch (PulsarClientException e) {
            e.printStackTrace();
            throw new MessagingException("PulsarClient start fail, " + e.getMessage(), e);
        }
    }

    @Bean(destroyMethod = "destroy")
    @ConditionalOnMissingBean(name = PULSAR_TEMPLATE_BEAN_NAME)
    public PulsarTemplate pulsarTemplate(PulsarClient client, MQProperties mqProperties){
        PulsarTemplate pulsarTemplate = new PulsarTemplate(client, mqProperties.getPulsar());
        pulsarTemplate.setEnvironment(environment);
        pulsarTemplate.initMessageConvert(messageConverter);
        return pulsarTemplate;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        MappingJackson2MessageConverter jackson2MessageConverter = new MappingJackson2MessageConverter();
        jackson2MessageConverter.setObjectMapper(JSON2Schema.getObjectMapper());
        jackson2MessageConverter.setSerializedPayloadClass(String.class);
        messageConverter = jackson2MessageConverter;
    }
}
